package com.xm.danmaku.handler;

import cn.hutool.core.lang.Assert;
import com.xm.danmaku.dto.DanmakuMessage;
import com.xm.danmaku.service.DanmakuPublisher;
import com.xm.danmaku.service.WebSocketSessionManager;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import java.io.IOException;
import java.util.concurrent.Flow;

@Slf4j
@Service
@RequiredArgsConstructor
public class DanmakuWebSocketHandler extends TextWebSocketHandler {

    private final WebSocketSessionManager sessionManager;

    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage textMessage) {
        // 解析客户端消息
        DanmakuMessage danmaku = DanmakuMessage.fromJson(textMessage.getPayload());
        String roomId = getRoomIdFromSession(session);
        danmaku.setRoomId(roomId);

        // 验证消息
        validateDanmaku(danmaku);

        // todo:保存到日志

        // 发布到消息中心
        DanmakuPublisher.getInstance().publish(danmaku);
    }

    @Override
    public void afterConnectionEstablished(WebSocketSession session) {
        String roomId = getRoomIdFromSession(session);
        sessionManager.addSession(roomId, session);
        // 为该会话创建订阅者
        Flow.Subscriber<DanmakuMessage> subscriber = createSubscriber(session);
        Flow.Subscription subscription = DanmakuPublisher.getInstance()
                .subscribe(roomId, subscriber);

        // 将会话与订阅关系保存,方便在连接关闭时取消订阅
        session.getAttributes().put("subscription", subscription);

    }

    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
        String roomId = getRoomIdFromSession(session);
        sessionManager.removeSession(roomId, session);

        // 取消订阅
        Flow.Subscription subscription = (Flow.Subscription)
                session.getAttributes().get("subscription");
        if (subscription != null) {
            subscription.cancel();
        }
    }

    /**
     * 创建WebSocket订阅者
     * @param session WebSocket会话
     * @return
     */
    private Flow.Subscriber<DanmakuMessage> createSubscriber(WebSocketSession session) {
        return new Flow.Subscriber<>() {
            private Flow.Subscription subscription;

            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                this.subscription = subscription;
                subscription.request(Long.MAX_VALUE); // 不限制请求数量
            }

            @Override
            public void onNext(DanmakuMessage danmaku) {
                try {
                    if (session.isOpen()) {
                        session.sendMessage(new TextMessage(danmaku.toJson()));
                    }
                } catch (IOException e) {
                    subscription.cancel();
                }
            }

            @Override
            public void onError(Throwable throwable) {
                // 处理错误
                log.info("WebSocket错误: {}", throwable.getMessage());
            }

            @Override
            public void onComplete() {
                // 发布者关闭
                log.info("WebSocket连接已关闭: {}", session.getId());
            }
        };
    }

    /**
     * 从WebSocket会话中获取房间ID
     * @param session WebSocket会话
     * @return 房间ID
     */
    private String getRoomIdFromSession(WebSocketSession session) {
        // 从URI获取roomId，如 ws://localhost:8080/ws/danmaku?roomId=123
        String query = session.getUri().getQuery();
        return query.split("=")[1];
    }

    /**
     * 验证弹幕消息
     * @param danmaku 弹幕消息
     */
    private void validateDanmaku(DanmakuMessage danmaku) {
        // 验证逻辑...
        Assert.notEmpty(danmaku.getRoomId(), "房间ID不能为空");
        Assert.notEmpty(danmaku.getContent(), "弹幕内容不能为空");
        Assert.notEmpty(danmaku.getUserId(), "用户ID不能为空");
    }
}
